package com.faner4cloud.yun.coustom;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

/**
 * @author luck(23092676)
 * @version v1
 *
 * @since 2020-04-06 15:32
 */
@Slf4j
@Component
public class BaseConsumer {

    /**
     * 表驱动
     */
    private final Map<Integer, Consumer<KafkaBaseDto>> dealConsumerMap = new HashMap<>();
    @Resource
    private AtaConsumerService ataConsumerService;
    @Resource
    private AtaExamConsumerService ataExamConsumerService;


    /**
     * @author luck(23092676)
     * @version v1
     * @summary 初始化数据
     * @since 2020-04-23 10:13
     */
    @PostConstruct
    public void init() {
		log.info("#处于docker环境#允许写入消费逻辑");
		dealConsumerMap.put(KafkaBaseDtoTypeEnum.ATA逻辑处理.getCode(), ataHandle);
		dealConsumerMap.put(KafkaBaseDtoTypeEnum.准考证打印通知处理.getCode(), ataAdmissionTicket);
    }


    public void onConsumer(String message) {
        log.info("收到消息baseConsumer,msg:{}", message);
        KafkaBaseDto kafkaBaseDto;
        try {
            kafkaBaseDto = JSONUtil.parse(message).toBean(KafkaBaseDto.class);
        } catch (Exception e) {
            log.info("收到消息baseConsumer,异常;消息格式不正确:{},", message, e);
            return;
        }
        try {
			//消费dealConsumerMap
			dealConsumerMap.get(kafkaBaseDto.getType()).accept(kafkaBaseDto);
        } catch (Exception e) {
            log.info("收到消息baseConsumer，异常处理消息失败{}", message, e);
        }
    }

    /**
     * ATA预约消息处理
     * <p>
     * 需要进行相关数据刷新
     * 需要刷新报名数据
     */
    private final Consumer<KafkaBaseDto> ataHandle = kafkaBaseDto -> {
        try {
            log.info("ATA逻辑处理kafkaBaseDto.getData():{}", kafkaBaseDto.getData());
            KafkaAtaOpsDto kafkaAtaOpsDto = JSONUtil.parse(kafkaBaseDto.getData()).toBean(KafkaAtaOpsDto.class);
            log.info("ATA逻辑处理consumer:kafkaAtaOpsDtoV1{}", kafkaAtaOpsDto);
            Consumer<KafkaAtaOpsDto> consumer = ataConsumerService.getDealConsumerMap().get(kafkaAtaOpsDto.getOpsType());
            log.info("ATA逻辑处理consumer:consumer{}", consumer);
            if (Objects.nonNull(consumer)) {
                consumer.accept(kafkaAtaOpsDto);
            } else {
                log.info("#ataHandle#未找到对应consumer:{}", kafkaBaseDto.getData());
            }
        } catch (Exception e) {
            log.error("[ATA预约]收到消息[{}],异常;不是本次需要的消息格式:{},", KafkaBaseDtoTypeEnum.ATA逻辑处理.name(), kafkaBaseDto.getData(), e);
        }
    };

    /**
     * 打印准考证
     */
    private final Consumer<KafkaBaseDto> ataAdmissionTicket = kafkaBaseDto -> {
        try {
            log.info("打印准考证ataAdmissionTicket - kafkaBaseDto.getData():{}", kafkaBaseDto.getData());
            KafkaAtaOpsDto kafkaAtaOpsDto = JSONUtil.parse(kafkaBaseDto.getData()).toBean(KafkaAtaOpsDto.class);
            log.info("打印准考证ataAdmissionTicket - consumer:kafkaAtaOpsDtoV1{}", kafkaAtaOpsDto);
            Consumer<KafkaAtaOpsDto> consumer = ataExamConsumerService.getDealConsumerMap().get(kafkaAtaOpsDto.getOpsType());
            log.info("打印准考证ataAdmissionTicket - consumer:consumer{}", consumer);
            if (Objects.nonNull(consumer)) {
                consumer.accept(kafkaAtaOpsDto);
            } else {
                log.info("#ataHandle#未找到对应consumer:{}", kafkaBaseDto.getData());
            }
        } catch (Exception e) {
            log.error("[打印准考证]收到消息[{}],异常;不是本次需要的消息格式:{},", KafkaBaseDtoTypeEnum.准考证打印通知处理.name(), kafkaBaseDto.getData(), e);
        }
    };
}
